// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/*!
 * @file calculatorClient.cxx
 * Client implementation for interfaces
 *
 * This file was generated by the tool fastddsgen (version: 4.1.0).
 */

#include "calculatorClient.hpp"

#include <atomic>
#include <exception>
#include <future>
#include <map>
#include <memory>
#include <mutex>
#include <queue>
#include <stdexcept>
#include <string>
#include <thread>

#include <fastdds/dds/domain/DomainParticipant.hpp>
#include <fastdds/dds/domain/qos/RequesterQos.hpp>
#include <fastdds/dds/publisher/DataWriter.hpp>
#include <fastdds/dds/rpc/exceptions.hpp>
#include <fastdds/dds/rpc/interfaces.hpp>
#include <fastdds/dds/rpc/RequestInfo.hpp>
#include <fastdds/dds/rpc/Requester.hpp>
#include <fastdds/dds/rpc/Service.hpp>
#include <fastdds/dds/rpc/ServiceTypeSupport.hpp>
#include <fastdds/dds/subscriber/DataReader.hpp>
#include <fastdds/rtps/common/Guid.hpp>
#include <fastdds/rtps/common/WriteParams.hpp>

#include "calculator.hpp"
#include "calculator_details.hpp"
#include "calculatorPubSubTypes.hpp"

namespace calculator_example {

//{ interface Calculator

namespace detail {

namespace fdds = eprosima::fastdds::dds;
namespace frpc = eprosima::fastdds::dds::rpc;
namespace frtps = eprosima::fastdds::rtps;

class CalculatorClient : public Calculator
{

    using RequestType = Calculator_Request;
    using ReplyType = Calculator_Reply;

public:

    CalculatorClient(
            fdds::DomainParticipant& part,
            const char* service_name,
            const fdds::RequesterQos& qos)
        : Calculator()
        , participant_(part)
    {
        // Register the service type support
        auto service_type = create_Calculator_service_type_support();
        auto ret = service_type.register_service_type(&participant_, "calculator_example::Calculator");
        if (ret != fdds::RETCODE_OK)
        {
            throw std::runtime_error("Error registering service type");
        }

        // Create the service
        service_ = participant_.create_service(service_name, "calculator_example::Calculator");
        if (nullptr == service_)
        {
            throw std::runtime_error("Error creating service");
        }

        // Create the requester
        requester_ = participant_.create_service_requester(service_, qos);
        if (nullptr == requester_)
        {
            throw std::runtime_error("Error creating requester");
        }

        // Start the processing thread
        start_thread();
    }

    ~CalculatorClient() override
    {
        // Stop the processing thread
        stop_thread();

        // Destroy the requester
        if (nullptr != requester_)
        {
            participant_.delete_service_requester(service_->get_service_name(), requester_);
            requester_ = nullptr;
        }

        // Destroy the service
        if (nullptr != service_)
        {
            participant_.delete_service(service_);
            service_ = nullptr;
        }
    }

private:

    void start_thread()
    {
        stop_thread_ = false;
        processing_thread_ = std::thread(&CalculatorClient::run, this);
    }

    void stop_thread()
    {
        stop_thread_ = true;
        if (processing_thread_.joinable())
        {
            processing_thread_.join();
        }
    }

    void run()
    {
        while (!stop_thread_)
        {
            // Wait for a reply
            if (requester_->get_requester_reader()->wait_for_unread_message({ 0, 100000000 }))
            {
                // Take and process the reply
                frpc::RequestInfo req_info;
                ReplyType reply;
                auto ret = requester_->take_reply(&reply, req_info);
                if (ret == fdds::RETCODE_OK)
                {
                    process_reply(reply, req_info);
                }
            }
        }
    }

    void process_reply(
            const ReplyType& reply,
            const frpc::RequestInfo& req_info)
    {
        auto sample_id = req_info.related_sample_identity;
        {
            std::lock_guard<std::mutex> _(mtx_);
            auto it = pending_results_.find(sample_id);
            if (it != pending_results_.end())
            {
                bool should_erase = false;
                it->second->process_reply(reply, req_info, should_erase);
                if (should_erase)
                {
                    pending_results_.erase(it);
                }
            }
        }
    }

    struct IReplyProcessor
    {
        frpc::RequestInfo info;

        virtual ~IReplyProcessor() = default;

        virtual void process_reply(
                const ReplyType& reply,
                const frpc::RequestInfo& req_info,
                bool& should_remove) = 0;

    };

    fdds::DomainParticipant& participant_;
    frpc::Service* service_ = nullptr;
    frpc::Requester* requester_ = nullptr;
    std::atomic<bool> stop_thread_{false};
    std::thread processing_thread_;
    std::mutex mtx_;
    std::map<frtps::SampleIdentity, std::shared_ptr<IReplyProcessor>> pending_results_;

    //{ request helpers

    template<typename T, typename TResult>
    std::future<TResult> send_request_with_promise(
            const RequestType& request,
            std::shared_ptr<T> result,
            std::promise<TResult>& promise)
    {
        result->info.related_sample_identity.writer_guid(requester_->get_requester_reader()->guid());
        if (fdds::RETCODE_OK == requester_->send_request((void*)&request, result->info))
        {
            std::lock_guard<std::mutex> _ (mtx_);
            pending_results_[result->info.related_sample_identity] = result;
        }
        else
        {
            promise.set_exception(
                std::make_exception_ptr(frpc::RpcBrokenPipeException(false)));
        }

        return promise.get_future();
    }

    template<typename T>
    std::shared_ptr<T> send_request_with_reader(
            const RequestType& request,
            std::shared_ptr<T> result)
    {
        result->info.related_sample_identity.writer_guid(requester_->get_requester_reader()->guid());
        if (fdds::RETCODE_OK == requester_->send_request((void*)&request, result->info))
        {
            std::lock_guard<std::mutex> _ (mtx_);
            pending_results_[result->info.related_sample_identity] = result;
        }
        else
        {
            result->set_exception(
                std::make_exception_ptr(frpc::RpcBrokenPipeException(false)));
        }

        return result;
    }

    //} request helpers

    //{ reply helpers

    template<typename T>
    static void set_invalid_reply(
            T& exception_receiver)
    {
        exception_receiver.set_exception(
            std::make_exception_ptr(frpc::RemoteInvalidArgumentError("An invalid reply was received")));
    }

    template<typename T>
    static void set_remote_exception(
            T& exception_receiver,
            const frpc::RemoteExceptionCode_t& exception)
    {
        switch (exception)
        {
            case frpc::RemoteExceptionCode_t::REMOTE_EX_OK:
                set_invalid_reply(exception_receiver);
                break;
            case frpc::RemoteExceptionCode_t::REMOTE_EX_UNSUPPORTED:
                exception_receiver.set_exception(std::make_exception_ptr(frpc::RemoteUnsupportedError()));
                break;
            case frpc::RemoteExceptionCode_t::REMOTE_EX_INVALID_ARGUMENT:
                exception_receiver.set_exception(std::make_exception_ptr(frpc::RemoteInvalidArgumentError()));
                break;
            case frpc::RemoteExceptionCode_t::REMOTE_EX_OUT_OF_RESOURCES:
                exception_receiver.set_exception(std::make_exception_ptr(frpc::RemoteOutOfResourcesError()));
                break;
            case frpc::RemoteExceptionCode_t::REMOTE_EX_UNKNOWN_OPERATION:
                exception_receiver.set_exception(std::make_exception_ptr(frpc::RemoteUnknownOperationError()));
                break;
            default: // REMOTE_EX_UNKNOWN_EXCEPTION
                exception_receiver.set_exception(std::make_exception_ptr(frpc::RemoteUnknownExceptionError()));
                break;
        }
    }

    static size_t count_reply_fields(
            const ReplyType& reply)
    {
        size_t n_fields = 0;
        n_fields += reply.representation_limits.has_value() ? 1 : 0;
        n_fields += reply.addition.has_value() ? 1 : 0;
        n_fields += reply.subtraction.has_value() ? 1 : 0;
        n_fields += reply.fibonacci_seq.has_value() ? 1 : 0;
        n_fields += reply.sum_all.has_value() ? 1 : 0;
        n_fields += reply.accumulator.has_value() ? 1 : 0;
        n_fields += reply.filter.has_value() ? 1 : 0;
        n_fields += reply.remoteEx.has_value() ? 1 : 0;
        return n_fields;
    }

    template<typename T>
    static bool validate_reply(
            std::promise<T>& promise,
            const ReplyType& reply)
    {
        // Check if the reply has one and only one field set
        size_t n_fields = count_reply_fields(reply);
        if (n_fields != 1u)
        {
            set_invalid_reply(promise);
            return false;
        }

        return true;
    }

    struct IExceptionHolder
    {
        virtual ~IExceptionHolder() = default;

        virtual void set_exception(
                std::exception_ptr exception) = 0;
    };

    static bool validate_reply(
            IExceptionHolder& reader,
            const ReplyType& reply)
    {
        // Check if the reply has one and only one field set
        size_t n_fields = count_reply_fields(reply);
        if (n_fields != 1u)
        {
            set_invalid_reply(reader);
            return false;
        }
        return true;
    }

    //} reply helpers

    //{ operation representation_limits

public:

    eprosima::fastdds::dds::rpc::RpcFuture<calculator_example::detail::Calculator_representation_limits_Out> representation_limits(
) override
    {
        // Create a promise to hold the result
        auto result = std::make_shared<representation_limits_promise>();

        // Create and send the request
        RequestType request;
        request.representation_limits = calculator_example::detail::Calculator_representation_limits_In{};
        return send_request_with_promise(request, result, result->promise);
    }

private:

    struct representation_limits_promise : public IReplyProcessor
    {
        std::promise<calculator_example::detail::Calculator_representation_limits_Out> promise;

        void process_reply(
                const ReplyType& reply,
                const frpc::RequestInfo& req_info,
                bool& should_remove) override
        {
            should_remove = false;
            if (req_info.related_sample_identity != info.related_sample_identity)
            {
                return;
            }

            should_remove = true;
            if (!validate_reply(promise, reply))
            {
                return;
            }

            if (reply.remoteEx.has_value())
            {
                set_remote_exception(promise, reply.remoteEx.value());
                return;
            }

            if (reply.representation_limits.has_value())
            {
                const auto& result = reply.representation_limits.value();
                if (result.result.has_value())
                {
                    const auto& out = result.result.value();
                    promise.set_value(out);
                    return;
                }
            }

            // If we reach this point, the reply is for another operation
            set_invalid_reply(promise);
        }

    };



    //} operation representation_limits
 
    //{ operation addition

public:

    eprosima::fastdds::dds::rpc::RpcFuture<int32_t> addition(
            /*in*/ int32_t value1,
            /*in*/ int32_t value2) override
    {
        // Create a promise to hold the result
        auto result = std::make_shared<addition_promise>();

        // Create and send the request
        RequestType request;
        request.addition = calculator_example::detail::Calculator_addition_In{};
        request.addition->value1 = value1;
        request.addition->value2 = value2;

        return send_request_with_promise(request, result, result->promise);
    }

private:

    struct addition_promise : public IReplyProcessor
    {
        std::promise<int32_t> promise;

        void process_reply(
                const ReplyType& reply,
                const frpc::RequestInfo& req_info,
                bool& should_remove) override
        {
            should_remove = false;
            if (req_info.related_sample_identity != info.related_sample_identity)
            {
                return;
            }

            should_remove = true;
            if (!validate_reply(promise, reply))
            {
                return;
            }

            if (reply.remoteEx.has_value())
            {
                set_remote_exception(promise, reply.remoteEx.value());
                return;
            }

            if (reply.addition.has_value())
            {
                const auto& result = reply.addition.value();
                if (result.result.has_value())
                {
                    const auto& out = result.result.value();
                    promise.set_value(out.return_);
                    return;
                }
                if (result.calculator_example_OverflowException_ex.has_value())
                {
                    promise.set_exception(
                        std::make_exception_ptr(result.calculator_example_OverflowException_ex.value()));
                    return;
                }
            }

            // If we reach this point, the reply is for another operation
            set_invalid_reply(promise);
        }

    };



    //} operation addition
 
    //{ operation subtraction

public:

    eprosima::fastdds::dds::rpc::RpcFuture<int32_t> subtraction(
            /*in*/ int32_t value1,
            /*in*/ int32_t value2) override
    {
        // Create a promise to hold the result
        auto result = std::make_shared<subtraction_promise>();

        // Create and send the request
        RequestType request;
        request.subtraction = calculator_example::detail::Calculator_subtraction_In{};
        request.subtraction->value1 = value1;
        request.subtraction->value2 = value2;

        return send_request_with_promise(request, result, result->promise);
    }

private:

    struct subtraction_promise : public IReplyProcessor
    {
        std::promise<int32_t> promise;

        void process_reply(
                const ReplyType& reply,
                const frpc::RequestInfo& req_info,
                bool& should_remove) override
        {
            should_remove = false;
            if (req_info.related_sample_identity != info.related_sample_identity)
            {
                return;
            }

            should_remove = true;
            if (!validate_reply(promise, reply))
            {
                return;
            }

            if (reply.remoteEx.has_value())
            {
                set_remote_exception(promise, reply.remoteEx.value());
                return;
            }

            if (reply.subtraction.has_value())
            {
                const auto& result = reply.subtraction.value();
                if (result.result.has_value())
                {
                    const auto& out = result.result.value();
                    promise.set_value(out.return_);
                    return;
                }
                if (result.calculator_example_OverflowException_ex.has_value())
                {
                    promise.set_exception(
                        std::make_exception_ptr(result.calculator_example_OverflowException_ex.value()));
                    return;
                }
            }

            // If we reach this point, the reply is for another operation
            set_invalid_reply(promise);
        }

    };



    //} operation subtraction
 
    //{ operation fibonacci_seq

public:

    std::shared_ptr<eprosima::fastdds::dds::rpc::RpcClientReader<int32_t> > fibonacci_seq(
            /*in*/ uint32_t n_results) override
    {
        // Create a reader to hold the result
        auto result = std::make_shared<fibonacci_seq_reader>(requester_);
        // Create and send the request
        RequestType request;
        request.fibonacci_seq = calculator_example::detail::Calculator_fibonacci_seq_In{};
        request.fibonacci_seq->n_results = n_results;

        return send_request_with_reader(request, result);
    }

private:

    struct fibonacci_seq_reader
        : public frpc::RpcClientReader<int32_t>
        , public IReplyProcessor
        , public IExceptionHolder
    {
        fibonacci_seq_reader(
                frpc::Requester* requester)
            : requester_(requester)
        {
        }

        void process_reply(
                const ReplyType& reply,
                const frpc::RequestInfo& req_info,
                bool& should_remove) override
        {
            should_remove = false;
            // Check if the reply is for this operation
            if (req_info.related_sample_identity != info.related_sample_identity)
            {
                return;
            }

            // Avoid processing replies from different writers
            if (frtps::GUID_t::unknown() == writer_id_)
            {
                writer_id_ = req_info.sample_identity.writer_guid();
            }
            else if (writer_id_ != req_info.sample_identity.writer_guid())
            {
                return;
            }

            should_remove = true;
            if (!validate_reply(*this, reply))
            {
                return;
            }

            if (reply.remoteEx.has_value())
            {
                set_remote_exception(*this, reply.remoteEx.value());
                return;
            }

            if (reply.fibonacci_seq.has_value())
            {
                const auto& result = reply.fibonacci_seq.value();
                if (result.result.has_value())
                {
                    const auto& out = result.result.value();
                    {
                        if (out.finished_.has_value())
                        {
                            // Store the finished value
                            std::lock_guard<std::mutex> _(mtx_);
                            finished_ = true;
                            cv_.notify_all();
                        }
                        else if (out.return_.has_value())
                        {
                            if (!cancelled_)
                            {
                                // Store the return value
                                std::lock_guard<std::mutex> _(mtx_);
                                queue_.push(out.return_.value());
                                cv_.notify_all();
                            }

                            // Should be kept in order to receive further values
                            should_remove = false;
                        }
                        else
                        {
                            set_invalid_reply(*this);
                        }
                    }
                    return;
                }
                if (result.calculator_example_OverflowException_ex.has_value())
                {
                    (*this).set_exception(
                        std::make_exception_ptr(result.calculator_example_OverflowException_ex.value()));
                    return;
                }
            }

            // If we reach this point, the reply is for another operation
            set_invalid_reply(*this);
        }

        void set_exception(
                std::exception_ptr exception) override
        {
            std::lock_guard<std::mutex> _(mtx_);
            if (!finished_)
            {
                exception_ = exception;
                finished_ = true;
                cv_.notify_all();
            }
        }

        bool read(
                int32_t& value) override
        {
            bool ret_val = false;
            std::unique_lock<std::mutex> lock(mtx_);
            while (!try_read(value, ret_val))
            {
                cv_.wait(lock);
            }
            return ret_val;
        }

        bool read(
                int32_t& value,
                const fdds::Duration_t& timeout) override
        {
            bool ret_val = false;
            std::unique_lock<std::mutex> lock(mtx_);
            std::chrono::steady_clock::time_point end_time =
                std::chrono::steady_clock::now() +
                std::chrono::seconds(timeout.seconds) +
                std::chrono::nanoseconds(timeout.nanosec);
            while (!try_read(value, ret_val))
            {
                cv_.wait_until(lock, end_time);
            }
            return ret_val;
        }

        void cancel() override
        {
            bool old_cancelled = cancelled_.exchange(true);
            if (!old_cancelled)
            {
                // Notify read calls that the operation was cancelled
                cv_.notify_all();

                // Communicate the cancellation to the server and wait for it to be acknowledged
                RequestType request;
                request.feed_cancel_ = true;
                frpc::RequestInfo req_info = info;
                auto ret = requester_->send_request((void*)&request, req_info);
                if (ret != fdds::RETCODE_OK)
                {
                    if (ret == fdds::RETCODE_TIMEOUT)
                    {
                        throw frpc::RpcTimeoutException();
                    }
                    else
                    {
                        throw frpc::RpcBrokenPipeException(false);
                    }
                }

                // Wait for the server to acknowledge the cancellation
                std::unique_lock<std::mutex> lock(mtx_);
                while (!finished_)
                {
                    cv_.wait(lock);
                }
            }
        }

    private:

        bool try_read(
                int32_t& value,
                bool& ret_val)
        {
            // Early exit if the operation was cancelled
            if (cancelled_)
            {
                ret_val = false;
                return true;
            }

            // Retrieve one value from the queue
            if (!queue_.empty())
            {
                value = queue_.front();
                queue_.pop();
                ret_val = true;
                return true;
            }

            // Throw the exception if it was set
            if (exception_)
            {
                auto ex = exception_;
                exception_ = nullptr;
                std::rethrow_exception(ex);
            }

            // Check if the operation was finished
            if (finished_)
            {
                ret_val = false;
                return true;
            }

            // Need to wait for a new value
            return false;
        }

        frpc::Requester* requester_ = nullptr;
        std::atomic<bool> cancelled_{false};
        std::exception_ptr exception_;
        std::queue<int32_t> queue_;
        bool finished_ = false;
        std::mutex mtx_;
        std::condition_variable cv_;
        frtps::GUID_t writer_id_ = frtps::GUID_t::unknown();

    };


    //} operation fibonacci_seq
 
    //{ operation sum_all

public:

    eprosima::fastdds::dds::rpc::RpcFuture<int32_t> sum_all(
            /*in*/ std::shared_ptr<eprosima::fastdds::dds::rpc::RpcClientWriter<int32_t>>& value) override
    {
        // Create a promise to hold the result
        auto result = std::make_shared<sum_all_promise>();

        // Create and send the request
        RequestType request;
        request.sum_all = calculator_example::detail::Calculator_sum_all_In{};
        value = std::make_shared<sum_all_value_writer>(requester_, result);

        return send_request_with_promise(request, result, result->promise);
    }

private:

    struct sum_all_promise : public IReplyProcessor
    {
        std::promise<int32_t> promise;

        void process_reply(
                const ReplyType& reply,
                const frpc::RequestInfo& req_info,
                bool& should_remove) override
        {
            should_remove = false;
            if (req_info.related_sample_identity != info.related_sample_identity)
            {
                return;
            }

            should_remove = true;
            if (!validate_reply(promise, reply))
            {
                return;
            }

            if (reply.remoteEx.has_value())
            {
                set_remote_exception(promise, reply.remoteEx.value());
                return;
            }

            if (reply.sum_all.has_value())
            {
                const auto& result = reply.sum_all.value();
                if (result.result.has_value())
                {
                    const auto& out = result.result.value();
                    promise.set_value(out.return_);
                    return;
                }
                if (result.calculator_example_OverflowException_ex.has_value())
                {
                    promise.set_exception(
                        std::make_exception_ptr(result.calculator_example_OverflowException_ex.value()));
                    return;
                }
            }

            // If we reach this point, the reply is for another operation
            set_invalid_reply(promise);
        }

    };


    struct sum_all_value_writer : public frpc::RpcClientWriter<int32_t>
    {
        using FeedType = calculator_example::detail::Calculator_sum_all_value_Feed;

        sum_all_value_writer(
                frpc::Requester* requester,
                std::shared_ptr<IReplyProcessor> result)
            : requester_(requester)
            , result_(result)
        {
        }

        void write(
                const int32_t& value) override
        {
            RequestType request;
            request.sum_all_value = FeedType{};
            request.sum_all_value->value = value;
            send_request(request);
        }

        void write(
                int32_t&& value) override
        {
            RequestType request;
            request.sum_all_value = FeedType{};
            request.sum_all_value->value = value;
            send_request(request);
        }

        void finish(
                frpc::RpcStatusCode reason = frpc::RPC_STATUS_CODE_OK) override
        {
            RequestType request;
            request.sum_all_value = FeedType{};
            request.sum_all_value->finished_ = reason;
            send_request(request);
        }

    private:

        void send_request(
                const RequestType& request)
        {
            frpc::RequestInfo req_info = result_->info;
            auto ret = requester_->send_request((void*)&request, req_info);
            if (ret != fdds::RETCODE_OK)
            {
                if (ret == fdds::RETCODE_TIMEOUT)
                {
                    throw frpc::RpcTimeoutException();
                }
                else
                {
                    throw frpc::RpcBrokenPipeException(false);
                }
            }
        }

        frpc::Requester* requester_ = nullptr;
        std::shared_ptr<IReplyProcessor> result_;
    };


    //} operation sum_all
 
    //{ operation accumulator

public:

    std::shared_ptr<eprosima::fastdds::dds::rpc::RpcClientReader<int32_t> > accumulator(
            /*in*/ std::shared_ptr<eprosima::fastdds::dds::rpc::RpcClientWriter<int32_t>>& value) override
    {
        // Create a reader to hold the result
        auto result = std::make_shared<accumulator_reader>(requester_);
        // Create and send the request
        RequestType request;
        request.accumulator = calculator_example::detail::Calculator_accumulator_In{};
        value = std::make_shared<accumulator_value_writer>(requester_, result);

        return send_request_with_reader(request, result);
    }

private:

    struct accumulator_reader
        : public frpc::RpcClientReader<int32_t>
        , public IReplyProcessor
        , public IExceptionHolder
    {
        accumulator_reader(
                frpc::Requester* requester)
            : requester_(requester)
        {
        }

        void process_reply(
                const ReplyType& reply,
                const frpc::RequestInfo& req_info,
                bool& should_remove) override
        {
            should_remove = false;
            // Check if the reply is for this operation
            if (req_info.related_sample_identity != info.related_sample_identity)
            {
                return;
            }

            // Avoid processing replies from different writers
            if (frtps::GUID_t::unknown() == writer_id_)
            {
                writer_id_ = req_info.sample_identity.writer_guid();
            }
            else if (writer_id_ != req_info.sample_identity.writer_guid())
            {
                return;
            }

            should_remove = true;
            if (!validate_reply(*this, reply))
            {
                return;
            }

            if (reply.remoteEx.has_value())
            {
                set_remote_exception(*this, reply.remoteEx.value());
                return;
            }

            if (reply.accumulator.has_value())
            {
                const auto& result = reply.accumulator.value();
                if (result.result.has_value())
                {
                    const auto& out = result.result.value();
                    {
                        if (out.finished_.has_value())
                        {
                            // Store the finished value
                            std::lock_guard<std::mutex> _(mtx_);
                            finished_ = true;
                            cv_.notify_all();
                        }
                        else if (out.return_.has_value())
                        {
                            if (!cancelled_)
                            {
                                // Store the return value
                                std::lock_guard<std::mutex> _(mtx_);
                                queue_.push(out.return_.value());
                                cv_.notify_all();
                            }

                            // Should be kept in order to receive further values
                            should_remove = false;
                        }
                        else
                        {
                            set_invalid_reply(*this);
                        }
                    }
                    return;
                }
                if (result.calculator_example_OverflowException_ex.has_value())
                {
                    (*this).set_exception(
                        std::make_exception_ptr(result.calculator_example_OverflowException_ex.value()));
                    return;
                }
            }

            // If we reach this point, the reply is for another operation
            set_invalid_reply(*this);
        }

        void set_exception(
                std::exception_ptr exception) override
        {
            std::lock_guard<std::mutex> _(mtx_);
            if (!finished_)
            {
                exception_ = exception;
                finished_ = true;
                cv_.notify_all();
            }
        }

        bool read(
                int32_t& value) override
        {
            bool ret_val = false;
            std::unique_lock<std::mutex> lock(mtx_);
            while (!try_read(value, ret_val))
            {
                cv_.wait(lock);
            }
            return ret_val;
        }

        bool read(
                int32_t& value,
                const fdds::Duration_t& timeout) override
        {
            bool ret_val = false;
            std::unique_lock<std::mutex> lock(mtx_);
            std::chrono::steady_clock::time_point end_time =
                std::chrono::steady_clock::now() +
                std::chrono::seconds(timeout.seconds) +
                std::chrono::nanoseconds(timeout.nanosec);
            while (!try_read(value, ret_val))
            {
                cv_.wait_until(lock, end_time);
            }
            return ret_val;
        }

        void cancel() override
        {
            bool old_cancelled = cancelled_.exchange(true);
            if (!old_cancelled)
            {
                // Notify read calls that the operation was cancelled
                cv_.notify_all();

                // Communicate the cancellation to the server and wait for it to be acknowledged
                RequestType request;
                request.feed_cancel_ = true;
                frpc::RequestInfo req_info = info;
                auto ret = requester_->send_request((void*)&request, req_info);
                if (ret != fdds::RETCODE_OK)
                {
                    if (ret == fdds::RETCODE_TIMEOUT)
                    {
                        throw frpc::RpcTimeoutException();
                    }
                    else
                    {
                        throw frpc::RpcBrokenPipeException(false);
                    }
                }

                // Wait for the server to acknowledge the cancellation
                std::unique_lock<std::mutex> lock(mtx_);
                while (!finished_)
                {
                    cv_.wait(lock);
                }
            }
        }

    private:

        bool try_read(
                int32_t& value,
                bool& ret_val)
        {
            // Early exit if the operation was cancelled
            if (cancelled_)
            {
                ret_val = false;
                return true;
            }

            // Retrieve one value from the queue
            if (!queue_.empty())
            {
                value = queue_.front();
                queue_.pop();
                ret_val = true;
                return true;
            }

            // Throw the exception if it was set
            if (exception_)
            {
                auto ex = exception_;
                exception_ = nullptr;
                std::rethrow_exception(ex);
            }

            // Check if the operation was finished
            if (finished_)
            {
                ret_val = false;
                return true;
            }

            // Need to wait for a new value
            return false;
        }

        frpc::Requester* requester_ = nullptr;
        std::atomic<bool> cancelled_{false};
        std::exception_ptr exception_;
        std::queue<int32_t> queue_;
        bool finished_ = false;
        std::mutex mtx_;
        std::condition_variable cv_;
        frtps::GUID_t writer_id_ = frtps::GUID_t::unknown();

    };

    struct accumulator_value_writer : public frpc::RpcClientWriter<int32_t>
    {
        using FeedType = calculator_example::detail::Calculator_accumulator_value_Feed;

        accumulator_value_writer(
                frpc::Requester* requester,
                std::shared_ptr<IReplyProcessor> result)
            : requester_(requester)
            , result_(result)
        {
        }

        void write(
                const int32_t& value) override
        {
            RequestType request;
            request.accumulator_value = FeedType{};
            request.accumulator_value->value = value;
            send_request(request);
        }

        void write(
                int32_t&& value) override
        {
            RequestType request;
            request.accumulator_value = FeedType{};
            request.accumulator_value->value = value;
            send_request(request);
        }

        void finish(
                frpc::RpcStatusCode reason = frpc::RPC_STATUS_CODE_OK) override
        {
            RequestType request;
            request.accumulator_value = FeedType{};
            request.accumulator_value->finished_ = reason;
            send_request(request);
        }

    private:

        void send_request(
                const RequestType& request)
        {
            frpc::RequestInfo req_info = result_->info;
            auto ret = requester_->send_request((void*)&request, req_info);
            if (ret != fdds::RETCODE_OK)
            {
                if (ret == fdds::RETCODE_TIMEOUT)
                {
                    throw frpc::RpcTimeoutException();
                }
                else
                {
                    throw frpc::RpcBrokenPipeException(false);
                }
            }
        }

        frpc::Requester* requester_ = nullptr;
        std::shared_ptr<IReplyProcessor> result_;
    };


    //} operation accumulator
 
    //{ operation filter

public:

    std::shared_ptr<eprosima::fastdds::dds::rpc::RpcClientReader<int32_t> > filter(
            /*in*/ std::shared_ptr<eprosima::fastdds::dds::rpc::RpcClientWriter<int32_t>>& value,
            /*in*/ calculator_example::FilterKind filter_kind) override
    {
        // Create a reader to hold the result
        auto result = std::make_shared<filter_reader>(requester_);
        // Create and send the request
        RequestType request;
        request.filter = calculator_example::detail::Calculator_filter_In{};
        value = std::make_shared<filter_value_writer>(requester_, result);
        request.filter->filter_kind = filter_kind;

        return send_request_with_reader(request, result);
    }

private:

    struct filter_reader
        : public frpc::RpcClientReader<int32_t>
        , public IReplyProcessor
        , public IExceptionHolder
    {
        filter_reader(
                frpc::Requester* requester)
            : requester_(requester)
        {
        }

        void process_reply(
                const ReplyType& reply,
                const frpc::RequestInfo& req_info,
                bool& should_remove) override
        {
            should_remove = false;
            // Check if the reply is for this operation
            if (req_info.related_sample_identity != info.related_sample_identity)
            {
                return;
            }

            // Avoid processing replies from different writers
            if (frtps::GUID_t::unknown() == writer_id_)
            {
                writer_id_ = req_info.sample_identity.writer_guid();
            }
            else if (writer_id_ != req_info.sample_identity.writer_guid())
            {
                return;
            }

            should_remove = true;
            if (!validate_reply(*this, reply))
            {
                return;
            }

            if (reply.remoteEx.has_value())
            {
                set_remote_exception(*this, reply.remoteEx.value());
                return;
            }

            if (reply.filter.has_value())
            {
                const auto& result = reply.filter.value();
                if (result.result.has_value())
                {
                    const auto& out = result.result.value();
                    {
                        if (out.finished_.has_value())
                        {
                            // Store the finished value
                            std::lock_guard<std::mutex> _(mtx_);
                            finished_ = true;
                            cv_.notify_all();
                        }
                        else if (out.return_.has_value())
                        {
                            if (!cancelled_)
                            {
                                // Store the return value
                                std::lock_guard<std::mutex> _(mtx_);
                                queue_.push(out.return_.value());
                                cv_.notify_all();
                            }

                            // Should be kept in order to receive further values
                            should_remove = false;
                        }
                        else
                        {
                            set_invalid_reply(*this);
                        }
                    }
                    return;
                }
            }

            // If we reach this point, the reply is for another operation
            set_invalid_reply(*this);
        }

        void set_exception(
                std::exception_ptr exception) override
        {
            std::lock_guard<std::mutex> _(mtx_);
            if (!finished_)
            {
                exception_ = exception;
                finished_ = true;
                cv_.notify_all();
            }
        }

        bool read(
                int32_t& value) override
        {
            bool ret_val = false;
            std::unique_lock<std::mutex> lock(mtx_);
            while (!try_read(value, ret_val))
            {
                cv_.wait(lock);
            }
            return ret_val;
        }

        bool read(
                int32_t& value,
                const fdds::Duration_t& timeout) override
        {
            bool ret_val = false;
            std::unique_lock<std::mutex> lock(mtx_);
            std::chrono::steady_clock::time_point end_time =
                std::chrono::steady_clock::now() +
                std::chrono::seconds(timeout.seconds) +
                std::chrono::nanoseconds(timeout.nanosec);
            while (!try_read(value, ret_val))
            {
                cv_.wait_until(lock, end_time);
            }
            return ret_val;
        }

        void cancel() override
        {
            bool old_cancelled = cancelled_.exchange(true);
            if (!old_cancelled)
            {
                // Notify read calls that the operation was cancelled
                cv_.notify_all();

                // Communicate the cancellation to the server and wait for it to be acknowledged
                RequestType request;
                request.feed_cancel_ = true;
                frpc::RequestInfo req_info = info;
                auto ret = requester_->send_request((void*)&request, req_info);
                if (ret != fdds::RETCODE_OK)
                {
                    if (ret == fdds::RETCODE_TIMEOUT)
                    {
                        throw frpc::RpcTimeoutException();
                    }
                    else
                    {
                        throw frpc::RpcBrokenPipeException(false);
                    }
                }

                // Wait for the server to acknowledge the cancellation
                std::unique_lock<std::mutex> lock(mtx_);
                while (!finished_)
                {
                    cv_.wait(lock);
                }
            }
        }

    private:

        bool try_read(
                int32_t& value,
                bool& ret_val)
        {
            // Early exit if the operation was cancelled
            if (cancelled_)
            {
                ret_val = false;
                return true;
            }

            // Retrieve one value from the queue
            if (!queue_.empty())
            {
                value = queue_.front();
                queue_.pop();
                ret_val = true;
                return true;
            }

            // Throw the exception if it was set
            if (exception_)
            {
                auto ex = exception_;
                exception_ = nullptr;
                std::rethrow_exception(ex);
            }

            // Check if the operation was finished
            if (finished_)
            {
                ret_val = false;
                return true;
            }

            // Need to wait for a new value
            return false;
        }

        frpc::Requester* requester_ = nullptr;
        std::atomic<bool> cancelled_{false};
        std::exception_ptr exception_;
        std::queue<int32_t> queue_;
        bool finished_ = false;
        std::mutex mtx_;
        std::condition_variable cv_;
        frtps::GUID_t writer_id_ = frtps::GUID_t::unknown();

    };

    struct filter_value_writer : public frpc::RpcClientWriter<int32_t>
    {
        using FeedType = calculator_example::detail::Calculator_filter_value_Feed;

        filter_value_writer(
                frpc::Requester* requester,
                std::shared_ptr<IReplyProcessor> result)
            : requester_(requester)
            , result_(result)
        {
        }

        void write(
                const int32_t& value) override
        {
            RequestType request;
            request.filter_value = FeedType{};
            request.filter_value->value = value;
            send_request(request);
        }

        void write(
                int32_t&& value) override
        {
            RequestType request;
            request.filter_value = FeedType{};
            request.filter_value->value = value;
            send_request(request);
        }

        void finish(
                frpc::RpcStatusCode reason = frpc::RPC_STATUS_CODE_OK) override
        {
            RequestType request;
            request.filter_value = FeedType{};
            request.filter_value->finished_ = reason;
            send_request(request);
        }

    private:

        void send_request(
                const RequestType& request)
        {
            frpc::RequestInfo req_info = result_->info;
            auto ret = requester_->send_request((void*)&request, req_info);
            if (ret != fdds::RETCODE_OK)
            {
                if (ret == fdds::RETCODE_TIMEOUT)
                {
                    throw frpc::RpcTimeoutException();
                }
                else
                {
                    throw frpc::RpcBrokenPipeException(false);
                }
            }
        }

        frpc::Requester* requester_ = nullptr;
        std::shared_ptr<IReplyProcessor> result_;
    };


    //} operation filter
 

};

}  // namespace detail

std::shared_ptr<Calculator> create_CalculatorClient(
        eprosima::fastdds::dds::DomainParticipant& part,
        const char* service_name,
        const eprosima::fastdds::dds::RequesterQos& qos)
{
    return std::make_shared<detail::CalculatorClient>(part, service_name, qos);
}

//} interface CalculatorClient


} // namespace calculator_example
